-
Notifications
You must be signed in to change notification settings - Fork 957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
misc/multistream-select: Implement simultaneous open extension #2066
Conversation
From the multistream-select 1.0 simultaneous open protocol extension specification: > In order to support direct connections through NATs with hole punching, we need to account for simultaneous open. In such cases, there is no single initiator and responder, but instead both peers act as initiators. This breaks protocol negotiation in multistream-select, which assumes a single initator. > This draft proposes a simple extension to the multistream protocol negotiation in order to select a single initator when both peers are acting as such. See libp2p/specs#196 for details. This commit implements the above specification, available via `Version::V1SimOpen`.
I'm seeing this in our ci logs: seems like it happens when two peers try to dial eachother at the same time. am I correct in assuming this PR will fix that? Looks like I'll have to try this out on Monday. |
Correct, though note that both peers need to run this patch. See https://github.com/libp2p/specs/blob/master/connections/simopen.md for more details.
🎉 please let me know how it goes. |
While trying to reproduce the first log and check that this PR fixes it I ran into a different error: |
Message::NotAvailable => { | ||
*this.state = SeqState::AwaitProtocol { io, protocol } | ||
} | ||
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so currently no fallback to old style in case the peer isn’t ready for V1SimOpen, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's supposed to be backwards compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using Version::V1SimOpen
the dialer expects either:
/libp2p/simultaneous-connect
in which case the remote indicates that it operates in dialing mode as well, i.e. a simultaneously dialed connection orna
in which case the remote indicates that it is a listener, i.e. not a simultaneously dialed connection, and thus we fall back to the "old style" viaSeqState::AwaitProtocol
.
Any other message would be a protocol violation, both with and without the simultaneous open extension.
@rkuhn does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that na
bit was the missing puzzle piece, thanks!
loop { | ||
match mem::replace(&mut self.state, SimOpenState::Done) { | ||
SimOpenState::SendNonce { mut io } => { | ||
match Pin::new(&mut io).poll_ready(cx)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn’t this always succeed? (I’m not deep inside libp2p — the last relevant call looks to have been poll_flush
, whose name seems to suggest that the socket should be ready)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in libp2p-quic we're ignoring the flush. a substream is flushed on close, but there is no support in quinn to manually flush a stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-reading the Sink docs I see what you mean, although the docs are still somewhat weird: I was tripped up by the documentation of start_send
which basically implies that poll_flush
is stronger than poll_ready
. And I have a hard time imagining a useful implementation where poll_ready
wouldn’t be immediately ready directly after a poll_flush
.
@mxinden this PR does not fix or fully fix the issue. I have a reproducible test case [0] which uses netsim-embed to simulate a simultaneous open. It's not possible to verify this without netsim-embed, because one swarm will dial first and the second swarm's dial will be rejected due to it having the same five tuple. However with netsim-embed we can simulate two different machines where they dial before receiving the first packet from the peer. In case you want to run it, you'll have to change the path to your local libp2p and then build with |
Looks like just because multistream-select decided that machine #0 is the responder, noise still thinks it's an initiator. Machine-#0 (stdout): Jun 08 12:29:21.575 TRACE multistream_select::protocol: Received message: Responder
Machine-#1 (stdout): Jun 08 12:29:21.575 TRACE multistream_select::protocol: Received message: Initiator
Machine-#0 (stdout): Jun 08 12:29:21.575 DEBUG multistream_select::dialer_select: Dialer: Proposed protocol: /noise
Machine-#1 (stdout): Jun 08 12:29:21.778 TRACE multistream_select::protocol: Received message: Protocol(Protocol(b"/noise"))
Machine-#1 (stdout): Jun 08 12:29:21.779 DEBUG multistream_select::listener_select: Listener: confirming protocol: /noise
Machine-#1 (stdout): Jun 08 12:29:21.779 DEBUG multistream_select::listener_select: Listener: sent confirmed protocol: /noise
Machine-#1 (stdout): Jun 08 12:29:21.780 TRACE libp2p_noise::io::framed: write state Ready
Machine-#1 (stdout): Jun 08 12:29:21.781 TRACE libp2p_noise::io::framed: write: cipher text len = 32 bytes
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: write state WriteLen { len: 32, buf: [0, 32], off: 0 }
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: write: frame len (32, [0, 32], 0/2)
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: write state WriteData { len: 32, off: 0 }
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: write: 32/32 bytes written
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: write: finished with 32 bytes
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: write state Ready
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: read state: Ready
Machine-#1 (stdout): Jun 08 12:29:21.782 TRACE libp2p_noise::io::framed: read state: ReadLen { buf: [0, 0], off: 0 }
Machine-#0 (stdout): Jun 08 12:29:21.981 TRACE multistream_select::protocol: Received message: Protocol(Protocol(b"/noise"))
Machine-#0 (stdout): Jun 08 12:29:21.981 DEBUG multistream_select::dialer_select: Dialer: Received confirmation for protocol: /noise
Machine-#0 (stdout): Jun 08 12:29:21.982 TRACE libp2p_noise::io::framed: write state Ready
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: write: cipher text len = 32 bytes
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: write state WriteLen { len: 32, buf: [0, 32], off: 0 }
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: write: frame len (32, [0, 32], 0/2)
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: write state WriteData { len: 32, off: 0 }
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: write: 32/32 bytes written
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: write: finished with 32 bytes
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: write state Ready
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: read state: Ready
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: read state: ReadLen { buf: [0, 0], off: 0 }
Machine-#0 (stdout): Jun 08 12:29:21.983 TRACE libp2p_noise::io::framed: read state: ReadLen { buf: [0, 0], off: 0 }
Machine-#0 (stdout): Jun 08 12:29:21.984 TRACE libp2p_noise::io::framed: read: frame len = 32
Machine-#0 (stdout): Jun 08 12:29:21.984 TRACE libp2p_noise::io::framed: read state: ReadData { len: 32, off: 0 }
Machine-#0 (stdout): Jun 08 12:29:21.984 TRACE libp2p_noise::io::framed: read: 32/32 bytes
Machine-#0 (stdout): Jun 08 12:29:21.984 TRACE libp2p_noise::io::framed: read: decrypting 32 bytes
Machine-#0 (stdout): Jun 08 12:29:21.984 DEBUG libp2p_noise::io::framed: read: decryption error
Machine-#0 (stdout): Jun 08 12:29:21.985 DEBUG libp2p_swarm: Connection attempt to PeerId("12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN") via "/ip4/192.168.39.3/tcp/30000/p2p/12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN" failed with Transport(Other(Custom { kind: Other, error: Transport(Custom { kind: Other, error: Other(A(B(Apply(Io(Kind(InvalidData)))))) }) })). Attempts remaining: 0.
Machine-#0 (stdout): Jun 08 12:29:21.985 TRACE ipfs_embed::net::peers: address reach failure Pending connection: Transport error: upgrade apply error: invalid data
Machine-#0 (stdout): Jun 08 12:29:21.985 TRACE ipfs_embed::net::peers: removing address /ip4/192.168.39.3/tcp/30000/p2p/12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN for peer 12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN
Machine-#0 (stdout): Jun 08 12:29:21.985 TRACE ipfs_embed::net::peers: dial failure 12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN
Machine-#0 (stdout): Jun 08 12:29:21.985 TRACE ipfs_embed::net::peers: Unreachable(PeerId("12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN"))
Error: sim open failed |
So this isn't that easy to fix. I don't think it makes sense to have this split between UpgradeOutbound/UpgradeInbound. Since an UpgradeOutbound also needs to be able to do what UpgradeInbound does. @mixden what are your thoughts on this? |
anyway this is the approach I was pursuing, but giving up since the swarm types are too complicated and I'm not sure if this split makes sense. there should probably be a single +type Output<C, U> = <U as OutboundUpgrade<Negotiated<C>>>::Output;
+type Error<C, U> = <U as OutboundUpgrade<Negotiated<C>>>::Error;
+
impl<C, U> Future for OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>>,
+ <U as OutboundUpgrade<Negotiated<C>>>::Output: 'static,
+ <U as OutboundUpgrade<Negotiated<C>>>::Error: 'static,
+ <U as OutboundUpgrade<Negotiated<C>>>::Future: Send + Unpin + 'static,
+ U: InboundUpgrade<Negotiated<C>,
+ Output = <U as OutboundUpgrade<Negotiated<C>>>::Output,
+ Error = <U as OutboundUpgrade<Negotiated<C>>>::Error>,
+
+ <U as InboundUpgrade<Negotiated<C>>>::Future: Send + Unpin + 'static,
{
- type Output = Result<U::Output, UpgradeError<U::Error>>;
+ type Output = Result<Output<C, U>, UpgradeError<Error<C, U>>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) {
OutboundUpgradeApplyState::Init { mut future, upgrade } => {
- let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? {
+ let (info, role, connection) = match Future::poll(Pin::new(&mut future), cx)? {
Poll::Ready(x) => x,
Poll::Pending => {
self.inner = OutboundUpgradeApplyState::Init { future, upgrade };
return Poll::Pending
}
};
- self.inner = OutboundUpgradeApplyState::Upgrade {
- future: Box::pin(upgrade.upgrade_outbound(connection, info.0))
+ let future: Pin<Box<dyn Future<Output = Result<Output<C, U>, Error<C, U>>> + Send + Unpin + 'static>> = match role {
+ multistream_select::SimOpenRole::Initiator => Box::pin(upgrade.upgrade_outbound(connection, info.0)),
+ multistream_select::SimOpenRole::Responder => Box::pin(upgrade.upgrade_inbound(connection, info.0)),
};
+ self.inner = OutboundUpgradeApplyState::Upgrade { future };
}
OutboundUpgradeApplyState::Upgrade { mut future } => {
match Future::poll(Pin::new(&mut future), cx) {
@@ -230,4 +243,3 @@ impl<N: ProtocolName> AsRef<[u8]> for NameWrap<N> {
self.0.protocol_name()
}
}
- |
I am in favor of that. It would suit protocols using However, for protocols where they differ, the associated types get more complex I reckon. |
I think we need to keep the InboundUpgrade/OutboundUpgrade. Instead I propose simply adding a role arg to the OutboundUpgrade and handling it in noise. |
Thinking out loud. Given that this only concerns connection upgrades and not substream upgrades and given that
|
I think that would affect the identify protocol. It's ok to have a connection from two dialers, it's only the noise handshake that needs to change. Quic handles this internally so shouldn't be relevant. |
Rereading the identify code, maybe it is not affected by this. I thought that the dialer asks the other party to identify themselves, but since this is a substream I guess either party can open an outbound substream. You're welcome to try something else, but I think I'll deploy my fix at actyx until we have something that works and is upstream. |
@mxinden I guess this PR can be merged, then we can continue discussion on how to hook it up on a different PR |
What would be the benefit of having this logic in |
Not much, but there isn't really any downside either. Making the diff between actyx libp2p and rust-libp2p smaller is an advantage for us. It's hard for us to estimate when and if a feature will be merged. We're releasing actyx 2.0.0 probably next week so ideally we would't rely on patching cargo.toml |
Status update: As mentioned above, while this pull request adds the core extension logic to I see multiple ways to do this, though I don't think any of them is ideal:
I will need to put more thoughts into this. Alternative suggestions are very much appreciated. In the long term I hope to solve this in a clean way within the ongoing Protocol Select efforts. More specifically I would like to find an abstraction that does not require a hack for a single protocol peculiarity namely TCP's simultaneous open "feature". |
@dvc94ch would you mind expanding on why you need this feature? Are you seeing uncoordinated, as in not timed for hole punching, simultaneous TCP opens in the wild? If so, do you have metrics on how often this is happening in your environment? |
we don't have metrics on how often this is happening. but it has happened and fixed it (even though arguably in a suboptimal way). and therefore isn't happening anymore. But once you account for roundtrip time, it is much more likely to happen in the real world than in a test environment. so if you have a path delay of 100ms there is quite a big time slot where this could happen. I'm not sure why it depends on relay v2 or dcutr, I think this is an independent feature/issue. And the sim open spec is stable (for better or worse) so this shouldn't change even if relay v2 and dcutr change. |
Also did you see my comments about the relay in the ipfs-embed issue you commented on? Hope some were useful, but I think the relay v2 will take a while. |
Can you expand on this? Yes, the longer the path delay the higher the chance that a simultaneous open happens. But even with a 100ms delay, I would judge the probability to be low for two nodes to accidentally dial each other in the same time window. I would need more data in order to justify the trade-off to merge a somewhat half-baked solution, unless we come up with a better design how to integrate it into rust-libp2p. |
So we've released v2. I'm on vacation this week and we have team meetings next week. I'll see if I have some time to bake it a bit more after that. |
@mxinden Consider a scenario where gossipsub is used for peer discovery: node A sends its newest info to B and C, who are currently not connected since they didn’t know about each other. Following the received gossip B and C connect to each other, in which case we have observed it to be quite likely for a simultaneous open to occur. |
…st-libp2p#2066 additionally fork all git dependencies into the Actyx org
Any progress on this? At actyx we have had to fork rust-libp2p to make things work, and we would really like to be able to stop doing that... |
Sorry for the delay / me not communicating well here and sorry for the trouble this is causing on your end. I said earlier that I will include this pull request in
I will test out #2250 before making any decision here. |
Allows `NetworkBehaviour` implementations to dial a peer, but instruct the dialed connection to be upgraded as if it were the listening endpoint. This is needed when establishing direct connections through NATs and/or Firewalls (hole punching). When hole punching via TCP (QUIC is different but similar) both ends dial the other at the same time resulting in a simultaneously opened TCP connection. To disambiguate who is the dialer and who the listener there are two options: 1. Use the Simultaneous Open Extension of Multistream Select. See [sim-open] specification and [sim-open-rust] Rust implementation. 2. Disambiguate the role (dialer or listener) based on the role within the DCUtR [dcutr] protocol. More specifically the node initiating the DCUtR process will act as a listener and the other as a dialer. This commit enables (2), i.e. enables the DCUtR protocol to specify the role used once the connection is established. While on the positive side (2) requires one round trip less than (1), on the negative side (2) only works for coordinated simultaneous dials. I.e. when a simultaneous dial happens by chance, and not coordinated via DCUtR, the connection attempt fails when only (2) is in place. [sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md [sim-open-rust]: libp2p#2066 [dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! 💯
```rust | ||
my_transport.upgrade() | ||
.authenticate(my_authentication) | ||
.multiplex(my_multiplexer) | ||
``` | ||
- `Version::V1Lazy` | ||
|
||
```rust | ||
my_transport.upgrade() | ||
.authenticate_with_version(my_authentication, Version::V1Lazy) | ||
.multiplex_with_version(my_multiplexer, Version::V1Lazy) | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's worth adding the explicit _with_version
methods. I'd just change the signatures of authenticate
and multiplex
. Version
already provides a Default
impl.
Pin<Box<<U as InboundUpgrade<Negotiated<C>>>::Future>>, | ||
>, | ||
}, | ||
Undefined, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make this an Option
instead of the additional enum member?
/// This multistream-select variant is specified in [1]. | ||
/// | ||
/// Note: [`Version::V1SimultaneousOpen`] should only be used (a) on transports that allow | ||
/// simultaneously opened connections, e.g. TCP with socket reuse and (2) during the first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// simultaneously opened connections, e.g. TCP with socket reuse and (2) during the first | |
/// simultaneously opened connections, e.g. TCP with socket reuse and (b) during the first |
/// | ||
/// Note: [`Version::V1SimultaneousOpen`] should only be used (a) on transports that allow | ||
/// simultaneously opened connections, e.g. TCP with socket reuse and (2) during the first | ||
/// negotiation on the connection, most likely the secure channel protocol negotiation. In all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYM with first negotiation on the connection?
} | ||
|
||
#[test] | ||
fn simultaneous_open() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a "full stack test"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few comments here and there. I guess my biggest question would be whether or not we need to "integrate" sim-open this tightly with the existing multistream-select implementation?
If we would offer a dedicated upgrade_sim_open
function, we might get away with creating a dedicated dialer_select_sim_open
that internally first does the sim-open dance and then continues with regular multistream-select (which we can already start in an arbitrary state).
However, I don't know how painful this would be in libp2p-core (i.e. would it introduce much duplication among the Authenticated
etc structs?).
Side-note: multistream-select
could really benefit from async-await
but I guess as long as we want to name the return type, this is not going to happen 😅
|
||
```rust | ||
my_transport.upgrade() | ||
.authenticate_with_version(my_authentication, Version::V1Lazy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a different type, isn't it?
.authenticate_with_version(my_authentication, Version::V1Lazy) | |
.authenticate_with_version(my_authentication, AuthenticationVersion::V1Lazy) |
my_transport.upgrade() | ||
.authenticate(my_authentication) | ||
.multiplex(my_multiplexer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered adding a .upgrade_sim_open()
API instead of changing the existing one? That would:
a) be backwards compatible
b) perhaps avoid the trickery around two Version
enums?
// Simultaneous open protocol extension | ||
SendSimOpen { | ||
io: MessageIO<R>, | ||
protocol: Option<N>, | ||
}, | ||
FlushSimOpen { | ||
io: MessageIO<R>, | ||
protocol: N, | ||
}, | ||
AwaitSimOpen { | ||
io: MessageIO<R>, | ||
protocol: N, | ||
}, | ||
SimOpenPhase { | ||
selection: SimOpenPhase<R>, | ||
protocol: N, | ||
}, | ||
Responder { | ||
responder: crate::ListenerSelectFuture<R, N>, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels a bit forced into here and could potentially be a bit more modularised but I think that would come at the cost of a lot of code duplication downstream because we would return different futures?
@@ -74,7 +112,7 @@ where | |||
.protocol_info() | |||
.into_iter() | |||
.map(NameWrap as fn(_) -> NameWrap<_>); | |||
let future = multistream_select::dialer_select_proto(conn, iter, v); | |||
let future = multistream_select::dialer_select_proto(conn, iter, v.into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was considering suggesting to create a dedicated dialer_sim_open_select_proto
function but I guess that idea is not particularly good because we rely on being able to name all these futures here?
// Start over. | ||
self.state = SimOpenState::SendNonce { io }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec says we should fail in this case!
In the unlikely case where both peers selected the same integer, connection establishment fails.
let result = match local_role { | ||
Role::Initiator if remote_msg == Message::Responder => Ok((io, local_role)), | ||
Role::Responder if remote_msg == Message::Initiator => Ok((io, local_role)), | ||
|
||
_ => Err(ProtocolError::InvalidMessage.into()), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find matching on a tuple easier to read in such circumstances.
let result = match local_role { | |
Role::Initiator if remote_msg == Message::Responder => Ok((io, local_role)), | |
Role::Responder if remote_msg == Message::Initiator => Ok((io, local_role)), | |
_ => Err(ProtocolError::InvalidMessage.into()), | |
}; | |
let result = match (local_role, remote_msg) { | |
(Role::Initiator, Message::Responder) => Ok((io, local_role)), | |
(Role::Responder, Message::Initiator) => Ok((io, local_role)), | |
_ => Err(ProtocolError::InvalidMessage.into()), | |
}; |
Ordering::Greater => { | ||
self.state = SimOpenState::SendRole { | ||
io, | ||
local_role: Role::Initiator, | ||
}; | ||
} | ||
Ordering::Less => { | ||
self.state = SimOpenState::SendRole { | ||
io, | ||
local_role: Role::Responder, | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could lift the assignment to self.state
out of the match
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Together with actually failing on Ordering::Equal
, you could write:
let local_role = match local_nonce.cmp(&remote_nonce) {
Ordering::Equal => {
return Poll::Ready(Err(...))
}
Ordering::Greater => Role::Initiator,
Ordering::Less => Role::Responder
};
self.state = SimOpenState::SendRole { io, local_role };
SimOpenState::FlushNonce { | ||
mut io, | ||
local_nonce, | ||
} => match Pin::new(&mut io).poll_flush(cx)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know about the _unpin
APIs in SinkExt
?
Maybe we can use them here to avoid a bit of boilerplate.
}; | ||
|
||
match msg { | ||
Message::Header(v) if v == HeaderLine::from(*this.version) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is equivalent I think? Given that SimOpen
implies V1
, I find it awkward to parse the version back into the headerline.
Message::Header(v) if v == HeaderLine::from(*this.version) => { | |
Message::Header(HeaderLine::V1) => { |
async fn run(version: Version) { | ||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
let listener_addr = listener.local_addr().unwrap(); | ||
|
||
let server = async move { | ||
let connec = listener.accept().await.unwrap().0; | ||
let protos = vec![b"/proto1", b"/proto2"]; | ||
let (proto, io, _) = dialer_select_proto_serial(connec, protos, version) | ||
.await | ||
.unwrap(); | ||
assert_eq!(proto, b"/proto2"); | ||
io.complete().await.unwrap(); | ||
}; | ||
|
||
let client = async move { | ||
let connec = TcpStream::connect(&listener_addr).await.unwrap(); | ||
let protos = vec![b"/proto3", b"/proto2"]; | ||
let (proto, io, _) = dialer_select_proto_serial(connec, protos.into_iter(), version) | ||
.await | ||
.unwrap(); | ||
assert_eq!(proto, b"/proto2"); | ||
io.complete().await.unwrap(); | ||
}; | ||
|
||
futures::future::join(server, client).await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could just be an async move
block inside block_on
unless we want to add more tests?
Great to see movement here! I’m not married to this (or any other solution), but I’d like to get rid of our Actyx forks of libp2p, for which I’d need some way to express a fix to this bug (remember that sim-open does happen in our system, without hole punching, due to correlated peer discovery). I’m willing to help, what is the roadmap? |
FYI: I’m moving Actyx (and ipfs-embed) away from this patch. The alternative solution is to not use PortReuse::Enabled and instead manage remote addresses a bit more carefully — thankfully we have the Identify protocol which provides the needed information. |
can you elaborate on that? you can never prevent two peers dialing simultaneously |
Without port reuse, these will be two completely separate connections, with no confusion over who is dialler and who is listener. |
Allows `NetworkBehaviour` implementations to dial a peer, but instruct the dialed connection to be upgraded as if it were the listening endpoint. This is needed when establishing direct connections through NATs and/or Firewalls (hole punching). When hole punching via TCP (QUIC is different but similar) both ends dial the other at the same time resulting in a simultaneously opened TCP connection. To disambiguate who is the dialer and who the listener there are two options: 1. Use the Simultaneous Open Extension of Multistream Select. See [sim-open] specification and [sim-open-rust] Rust implementation. 2. Disambiguate the role (dialer or listener) based on the role within the DCUtR [dcutr] protocol. More specifically the node initiating the DCUtR process will act as a listener and the other as a dialer. This commit enables (2), i.e. enables the DCUtR protocol to specify the role used once the connection is established. While on the positive side (2) requires one round trip less than (1), on the negative side (2) only works for coordinated simultaneous dials. I.e. when a simultaneous dial happens by chance, and not coordinated via DCUtR, the connection attempt fails when only (2) is in place. [sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md [sim-open-rust]: #2066 [dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md
@mxinden: I guess this PR is abandoned? In that case it should probably be closed. |
With #2363 we won't need this pull request for hole punching. I am still hesitant to support Multistream Select Sim Open in rust-libp2p. See reasoning in #2066 (comment). With that in mind, I am closing here.
Thanks for the ping @dvc94ch. Sorry @wngr and @thomaseizinger to not address your PR feedback.
Agreed. Related: I think we should make sure we can use |
Allows `NetworkBehaviour` implementations to dial a peer, but instruct the dialed connection to be upgraded as if it were the listening endpoint. This is needed when establishing direct connections through NATs and/or Firewalls (hole punching). When hole punching via TCP (QUIC is different but similar) both ends dial the other at the same time resulting in a simultaneously opened TCP connection. To disambiguate who is the dialer and who the listener there are two options: 1. Use the Simultaneous Open Extension of Multistream Select. See [sim-open] specification and [sim-open-rust] Rust implementation. 2. Disambiguate the role (dialer or listener) based on the role within the DCUtR [dcutr] protocol. More specifically the node initiating the DCUtR process will act as a listener and the other as a dialer. This commit enables (2), i.e. enables the DCUtR protocol to specify the role used once the connection is established. While on the positive side (2) requires one round trip less than (1), on the negative side (2) only works for coordinated simultaneous dials. I.e. when a simultaneous dial happens by chance, and not coordinated via DCUtR, the connection attempt fails when only (2) is in place. [sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md [sim-open-rust]: libp2p/rust-libp2p#2066 [dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md
Allows `NetworkBehaviour` implementations to dial a peer, but instruct the dialed connection to be upgraded as if it were the listening endpoint. This is needed when establishing direct connections through NATs and/or Firewalls (hole punching). When hole punching via TCP (QUIC is different but similar) both ends dial the other at the same time resulting in a simultaneously opened TCP connection. To disambiguate who is the dialer and who the listener there are two options: 1. Use the Simultaneous Open Extension of Multistream Select. See [sim-open] specification and [sim-open-rust] Rust implementation. 2. Disambiguate the role (dialer or listener) based on the role within the DCUtR [dcutr] protocol. More specifically the node initiating the DCUtR process will act as a listener and the other as a dialer. This commit enables (2), i.e. enables the DCUtR protocol to specify the role used once the connection is established. While on the positive side (2) requires one round trip less than (1), on the negative side (2) only works for coordinated simultaneous dials. I.e. when a simultaneous dial happens by chance, and not coordinated via DCUtR, the connection attempt fails when only (2) is in place. [sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md [sim-open-rust]: libp2p#2066 [dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md
From the multistream-select 1.0 simultaneous open protocol extension
specification:
See libp2p/specs#196 for details.
This commit implements the above specification, available via
Version::V1SimultaneousOpen
.Given that the extension is backward compatible, there is no gain for rust-libp2p to support it without supporting the larger effort of TCP hole punching.
To test this against the Golang implementation: